package com.xueyuan.wata.daph.node.flink117.string.general

import com.xueyuan.wata.daph.flink117.api.node.datastream.DataStreamNode
import com.xueyuan.wata.daph.flink117.constants.FlinkSqlConstants._
import com.xueyuan.wata.daph.flink117.enums.{CatalogDatabaseType, CatalogRangeType}
import com.xueyuan.wata.daph.flink117.utils.CatalogUtil
import com.xueyuan.wata.daph.flink117.utils.CatalogUtil.registerCatalog
import com.xueyuan.wata.daph.flink117.utils.FlinkUtil.setSqlDialect
import com.xueyuan.wata.daph.utils.SQLUtil
import org.apache.commons.lang3.StringUtils
import org.apache.flink.table.catalog.{Catalog, ObjectPath}

trait GeneralNode extends DataStreamNode {
  private var _catalog: Catalog = _

  def su(config: GeneralConfig): Catalog = {
    val catalogRangeType = config.catalogRangeType
    val catalogDatabaseType = config.catalogDatabaseType
    val catalogName = config.catalogName
    val catalogConfig = config.catalogConfig
    val databaseName = config.databaseName
    val sqlDialect = config.sqlDialect

    setDialect(sqlDialect)
    useCatalog(catalogRangeType, catalogDatabaseType, catalogName, catalogConfig)
    if (!StringUtils.isEmpty(databaseName)) useDatabase(databaseName)

    _catalog
  }

  def suDefault(): Unit = {
    setDialect(DIALECT_DEFAULT)
    tableEnv.useCatalog(NAME_DEFAULT_CATALOG)
    useDatabase(NAME_DEFAULT_DATABASE)
  }

  def setDialect(sqlDialect: String): Unit = setSqlDialect(tableEnv, sqlDialect)

  def useCatalog(catalogRangeType: String, catalogDatabaseType: String, catalogName: String, catalogConfig: Map[String, String] = Map.empty): Catalog = {
    val catalog = CatalogRangeType.withName(catalogRangeType) match {
      case CatalogRangeType.GLOBAL =>
        globalCatalog(catalogName)
      case CatalogRangeType.LOCAL =>
        val dbType = CatalogDatabaseType.withName(catalogDatabaseType)
        CatalogUtil.createCatalog(dbType, catalogConfig)
      case _ =>
        tableEnv.getCatalog(NAME_DEFAULT_CATALOG).get()
    }

    _catalog = catalog
    registerCatalog(tableEnv, catalogName, catalog)
    tableEnv.useCatalog(catalogName)
    _catalog
  }

  def useDatabase(databaseName: String): Unit = {
    tableEnv.useDatabase(databaseName)
  }

  def createTemporaryView(tableName: String, sqlStatement: String): Unit = {
    setDialect(DIALECT_DEFAULT)
    tableEnv.useCatalog(NAME_DEFAULT_CATALOG)
    tableEnv.useDatabase(NAME_DEFAULT_DATABASE)
    val table = tableEnv.sqlQuery(sqlStatement)
    tableEnv.createTemporaryView(tableName, table)
  }

  def createCatalogTable(databaseName: String, tableName: String, createSql: String): Unit = {
    val tableExists = _catalog.tableExists(new ObjectPath(databaseName, tableName))
    if (!tableExists) {
      tableEnv.executeSql(createSql)
    } else {
      logger.info(s"Table[${tableEnv.getCurrentCatalog}.$databaseName.$tableName] already exists.")
    }
  }

  def createTable(databaseName: String, createSql: String, resultSql: String): Unit = {
    if (!StringUtils.isEmpty(createSql)) {
      val tableName = SQLUtil.getTableNames(resultSql).head
      if (!StringUtils.isEmpty(databaseName)) {
        createCatalogTable(databaseName, tableName, createSql)
      } else {
        val dt = tableName.split("\\.")
        val dn = dt(1)
        val tn = dt(2)
        createCatalogTable(dn, tn, createSql)
      }
    } else {
      logger.info("createSql is empty.")
    }
  }
}